package org.snake.nebulae.core.task;

import cn.hutool.core.date.DateUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.snake.nebulae.core.model.ServiceInfo;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
 * 心跳维持任务
 */
@Slf4j
public class HeartBeatTask implements Runnable {

    /**
     * 任务启动时间点
     */
    private CompletableFuture<IMqttToken> doPoint;

    /**
     * 任务启动线程池
     */
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    /**
     * 具体服务信息
     */
    private ServiceInfo serviceInfo;

    /**
     * mqtt异步客户端
     */
    private MqttAsyncClient mqttAsyncClient;

    /**
     * json工具
     */
    private ObjectMapper objectMapper;

    /**
     * 延迟时间
     */
    private long delayTime = 5000;

    public HeartBeatTask(CompletableFuture<IMqttToken> doPoint,
                         ThreadPoolTaskScheduler threadPoolTaskScheduler,
                         MqttAsyncClient mqttAsyncClient,
                         ObjectMapper objectMapper,
                         ServiceInfo serviceInfo) {
        this.doPoint = doPoint;
        this.threadPoolTaskScheduler = threadPoolTaskScheduler;
        this.mqttAsyncClient = mqttAsyncClient;
        this.objectMapper = objectMapper;
        this.serviceInfo = serviceInfo;

        Consumer<IMqttToken> bindingTaskToThreadPool = iMqttToken -> {

            this.threadPoolTaskScheduler.scheduleWithFixedDelay(this, delayTime);
            log.info("{} 服务心跳维持任务启动完毕 时间是 {} 延迟时间是 {} 服务是 {}", this.getClass(), DateUtil.now(), delayTime, serviceInfo);
        };
        this.doPoint.thenAcceptAsync(bindingTaskToThreadPool);
    }

    @SneakyThrows
    @Override
    public void run() {
        // 更新服务心跳时间戳
        serviceInfo.setLastHeartBeatTime(System.currentTimeMillis());

        // 往服务注册主题更新服务信息
        String data = objectMapper.writeValueAsString(serviceInfo);
        mqttAsyncClient.publish(CoreTask.topicServices, data.getBytes(), 2, false);

        log.info("{} 执行一次服务心跳任务 时间是 {} 延迟时间是 {} 服务是 {}", this.getClass(), DateUtil.now(), delayTime, serviceInfo);
    }
}
